 1.Kafka源码剖析之消息存储机制
   
   log.dirs/<topic-name>-<partitionno>/{.index，.timeindex，.log}
   首先查看Kafka如何处理生产的消息：
   调用副本管理器，将记录追加到分区的副本中。
   将数据追加到本地的Log日志中：
   追加消息的实现：
   遍历需要追加的每个主题分区的消息：
   调用partition的方法将记录追加到该分区的leader分区中：
   如果在本地找到了该分区的leader：
   执行下述逻辑将消息追加到leader分区：
// 获取该分区的log
val log = leaderReplica.log.get
// 获取最小ISR副本数
val minIsr = log.config.minInSyncReplicas
// 计算同步副本的个数
val inSyncSize = inSyncReplicas.size
// 如果同步副本的个数小于要求的最小副本数，并且acks设置的是-1，则不追加消息
if (inSyncSize < minIsr && requiredAcks == -1) {
throw new NotEnoughReplicasException("Number of insync replicas for partition %s is
[%d], below required minimum [%d]"
.format(topicPartition, inSyncSize, minIsr))
}
// 追加消息到leader
val info = log.appendAsLeader(records, leaderEpoch = this.leaderEpoch, isFromClient)
// 尝试锁定follower获取消息的请求，因为此时leader正在更新LEO。
replicaManager.tryCompleteDelayedFetch(TopicPartitionOperationKey(this.topic,
this.partitionId))
// 如果ISR只有一个元素的话，需要HW+1
(info, maybeIncrementLeaderHW(leaderReplica))
   log.appendAsLeader(records, leaderEpoch = this.leaderEpoch, isFromClient) 的实现：
   具体代码的实现：
/**
* 验证如下信息：
* 每条消息与其CRC是否匹配
* 每条消息的字节数是否匹配
* 传⼊记录批的序列号与现有状态以及彼此之间是否⼀致。
* 同时计算如下值：
* 消息批中第一个偏移量
* 消息批中最后一个偏移量
* 消息个数
* 正确字节的个数
* 偏移量是否单调递增
* 是否使用了压缩编码解码器（如果使用了压缩编解码器，则给出最后⼀个）
*/
val appendInfo = analyzeAndValidateRecords(records, isFromClient = isFromClient)
// 如果没有消息需要追加或该消息集合与上一个消息集合重复，则返回
if (appendInfo.shallowCount == 0)
    return appendInfo
// 在向磁盘日志追加之前剔除不正确的字节或剔除不完整的消息
var validRecords = trimInvalidBytes(records, appendInfo)
// 消息集合剩余的正确部分，插入到日志中
lock synchronized {
// 检查日志的MMap是否关闭了，如果关闭方法进行写操作，抛异常
checkIfMemoryMappedBufferClosed()
if (assignOffsets) {
// 如果需要给消息添加偏移量
    val offset = new LongRef(nextOffsetMetadata.messageOffset)
    appendInfo.firstOffset = offset.value
    val now = time.milliseconds
    val validateAndOffsetAssignResult = try {
// 校验消息和赋值给消息的偏移量是否正确⽆误
       LogValidator.validateMessagesAndAssignOffsets(validRecords,
        offset,
        time,
        now,
        appendInfo.sourceCodec,
        appendInfo.targetCodec,
        config.compact,
        config.messageFormatVersion.messageFormatVersion.value,
        config.messageTimestampType,
        config.messageTimestampDifferenceMaxMs,
        leaderEpoch,
        isFromClient)
   } catch {
     case e: IOException => throw new KafkaException("Error in validating
messages while appending to log '%s'".format(name), e)
}
// 正确的消息集合，此时处于内存中
    validRecords = validateAndOffsetAssignResult.validatedRecords
// 要追加消息的最大时间戳
    appendInfo.maxTimestamp = validateAndOffsetAssignResult.maxTimestamp
// 要追加的消息的最大时间戳对应的偏移量
    appendInfo.offsetOfMaxTimestamp =
validateAndOffsetAssignResult.shallowOffsetOfMaxTimestamp
// 最后一个偏移量是偏移量的值-1
    appendInfo.lastOffset = offset.value - 1
    appendInfo.recordsProcessingStats =
validateAndOffsetAssignResult.recordsProcessingStats
    if (config.messageTimestampType == TimestampType.LOG_APPEND_TIME)
// 如果消息时间戳的类型是日志追加的时间，则需要赋值当前系统时间
       appendInfo.logAppendTime = now
// 需要重新验证消息的大小，以防消息发生改变，如重新压缩或者转换了消息格式
   if (validateAndOffsetAssignResult.messageSizeMaybeChanged) {
       for (batch <- validRecords.batches.asScala) {
// 如果消息集合的字节数大于配置的消息最大字节数，抛异常
         if (batch.sizeInBytes > config.maxMessageSize) {
// we record the original message set size instead of the trimmed size
// to be consistent with pre-compression bytesRejectedRate recording
brokerTopicStats.topicStats(topicPartition.topic).bytesRejectedRate.mark(records.si
zeInBytes)
brokerTopicStats.allTopicsStats.bytesRejectedRate.mark(records.sizeInBytes)
         throw new RecordTooLargeException("Message batch size is %d bytes which
exceeds the maximum configured size of %d."
.format(batch.sizeInBytes, config.maxMessageSize))
        }
      }
	 }
   } else {
// 如果不需要分配消息偏移量，则使⽤给定的消息偏移量
     if (!appendInfo.offsetsMonotonic)
// 如果偏移量不是单调递增的，抛异常
       throw new OffsetsOutOfOrderException(s"Out of order offsets found in append
to $topicPartition: " +
records.records.asScala.map(_.offset))
// 如果消息批的第一个偏移量小于分区leader日志中下一条记录的偏移量，抛异常。
     if (appendInfo.firstOffset < nextOffsetMetadata.messageOffset) {
// we may still be able to recover if the log is empty
// one example: fetching from log start offset on the leader which is not batch aligned,
// which may happen as a result of AdminClient#deleteRecords()
// appendInfo.firstOffset maybe either first offset or last offset of the first batch.
// get the actual first offset, which may require decompressing the data
        val firstOffset = records.batches.asScala.head.baseOffset()
        throw new UnexpectedAppendOffsetException(
          s"Unexpected offset in append to $topicPartition. First offset or last
offset of the first batch " +
          s"${appendInfo.firstOffset} is less than the next offset
${nextOffsetMetadata.messageOffset}. " +
          s"First 10 offsets in append:
${records.records.asScala.take(10).map(_.offset)}, last offset in" +
          s" append: ${appendInfo.lastOffset}. Log start offset = $logStartOffset",
firstOffset, appendInfo.lastOffset)
    }
   }
// 使用leader给消息赋值的epoch值更新缓存的epoch值。
   validRecords.batches.asScala.foreach { batch =>
     if (batch.magic >= RecordBatch.MAGIC_VALUE_V2)
// 需要在epoch中记录leader的epoch值和消息集合的起始偏移量
        leaderEpochCache.assign(batch.partitionLeaderEpoch, batch.baseOffset)
   }
// 检查消息批的字节大小是否大于日志分段的最⼤值，如果是，则抛异常
   if (validRecords.sizeInBytes > config.segmentSize) {
     throw new RecordBatchTooLargeException("Message batch size is %d bytes which
exceeds the maximum configured segment size of %d."
       .format(validRecords.sizeInBytes, config.segmentSize))
   }
// 消息批的消息都正确，偏移量也都赋值了，时间戳也更新了
// 此时需要验证生产者的幂等性/事务状态，并收集一些元数据
   val (updatedProducers, completedTxns, maybeDuplicate) =
analyzeAndValidateProducerState(validRecords, isFromClient)
// 如果是重复的消息批，则直接返回被重复的消息批的appendInfo
   maybeDuplicate.foreach { duplicate =>
     appendInfo.firstOffset = duplicate.firstOffset
     appendInfo.lastOffset = duplicate.lastOffset
     appendInfo.logAppendTime = duplicate.timestamp
	 appendInfo.logStartOffset = logStartOffset
     return appendInfo
   }
   // 如果当前日志分段写满了，则滚动日志分段
   val segment = maybeRoll(messagesSize = validRecords.sizeInBytes,
     maxTimestampInMessages = appendInfo.maxTimestamp,
     maxOffsetInMessages = appendInfo.lastOffset)
   val logOffsetMetadata = LogOffsetMetadata(
     messageOffset = appendInfo.firstOffset,
     segmentBaseOffset = segment.baseOffset,
     relativePositionInSegment = segment.size)
// 日志片段中追加消息
   segment.append(firstOffset = appendInfo.firstOffset,
     largestOffset = appendInfo.lastOffset,
     largestTimestamp = appendInfo.maxTimestamp,
     shallowOffsetOfMaxTimestamp = appendInfo.offsetOfMaxTimestamp,
     records = validRecords)
// 更新生产者状态
   for ((producerId, producerAppendInfo) <- updatedProducers) {
     producerAppendInfo.maybeCacheTxnFirstOffsetMetadata(logOffsetMetadata)
     producerStateManager.update(producerAppendInfo)
   }
// update the transaction index with the true last stable offset. The last offset visible
// to consumers using READ_COMMITTED will be limited by this value and the high watermark.
   for (completedTxn <- completedTxns) {
     val lastStableOffset = producerStateManager.completeTxn(completedTxn)
     segment.updateTxnIndex(completedTxn, lastStableOffset)
}
// always update the last producer id map offset so that the snapshot reflects
// the current offset
// even if there isn't any idempotent data being written
   producerStateManager.updateMapEndOffset(appendInfo.lastOffset + 1)
// leader的LEO+1
   updateLogEndOffset(appendInfo.lastOffset + 1)
// update the first unstable offset (which is used to compute LSO)
   updateFirstUnstableOffset()
   trace(s"Appended message set to log with last offset ${appendInfo.lastOffset} "
+
     s"first offset: ${appendInfo.firstOffset}, " +
     s"next offset: ${nextOffsetMetadata.messageOffset}, " +
     s"and messages: $validRecords")
   // 如果未刷盘的消息个数⼤于配置的消息个数，刷盘
   if (unflushedMessages >= config.flushInterval)
     // 刷盘
     flush()
   appendInfo
  }
}
   